package io.mqttpush.mqttclient;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;

/**
 * 
 * @author tianzhenjiu
 *
 */
@Sharable
public class ConnectionHandle extends ChannelInboundHandlerAdapter {

	
	Logger logger=LoggerFactory.getLogger(getClass());
	
	
	String deviceId;
	String username;
	String password;
	Integer pingTime=59;
	CancelbleExecutorService executorService;
	
	PingRunnable pingRunnable;
	
	
	Function<Channel,ChannelFuture> reConnectFunc;
	

	Consumer<Channel>  successlogin;
			
	
	
	

	public ConnectionHandle(String deviceId, String username,
			String password, Integer pingTime,
			CancelbleExecutorService executorService,
			Function<Channel, ChannelFuture> reConnectFunc,
			Consumer<Channel>  successlogin) {
		super();
		this.deviceId = deviceId;
		this.username = username;
		this.password = password;
		this.pingTime = pingTime;
		this.executorService = executorService;
		this.reConnectFunc = reConnectFunc;		
		this.successlogin=successlogin;
	}

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		
		ChannelFuture channelFuture=login(ctx.channel(), deviceId, username, password);
		channelFuture.addListener((ChannelFuture localFuture)->{
			if(!localFuture.isSuccess()) {
				localFuture.cause().printStackTrace();
			}else {
				
				AtomicBoolean hasConnect=Status.hasConnect;
				hasConnect.set(true);
				pingRunnable=new PingRunnable(localFuture.channel(), reConnectFunc, pingTime,executorService);
				pingRunnable.updatehasResp(true);
				executorService.schedule(pingRunnable, pingTime, TimeUnit.SECONDS);
				
				
				if (logger.isDebugEnabled()) {
					logger.debug("连接成功");
				}
				
			}
		});
		ctx.channel().closeFuture().addListener((ChannelFuture future)->{
			reConnectFunc.apply(ctx.channel());
		});
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.warn("链路异常"+ cause);
		ctx.fireExceptionCaught(cause);
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {

		logger.warn("链路关闭,将会重新连接");
		AtomicBoolean isLogin=Status.isLogin;
		isLogin.set(false);
		super.channelInactive(ctx);

	}
	
	/**
	 * 发送登录报文
	 * @param channel
	 * @param deviceId
	 * @param username
	 * @param password
	 * @return
	 */
	public ChannelFuture login(Channel channel,String deviceId,String username, String password) {
		
		MqttFixedHeader mqttFixedHeader=new MqttFixedHeader(MqttMessageType.CONNECT,
				false, MqttQoS.AT_MOST_ONCE, false, 0);
		
		MqttConnectVariableHeader variableHeader=new MqttConnectVariableHeader(
				MqttVersion.MQTT_3_1_1.protocolName(), MqttVersion.MQTT_3_1_1.protocolLevel(), true, true, false, 0,false, false, 10);
		
		MqttConnectPayload payload=new MqttConnectPayload(deviceId, null, null, username, password.getBytes());
		MqttConnectMessage connectMessage=new MqttConnectMessage(mqttFixedHeader, variableHeader, payload);
		return channel.writeAndFlush(connectMessage);
	}
	
	
	

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

		if (msg instanceof MqttMessage) {

			MqttMessage message = (MqttMessage) msg;
			MqttFixedHeader fixedHeader = message.fixedHeader();
			MqttMessageType messageType = fixedHeader.messageType();

			switch (messageType) {
			
			case DISCONNECT:
				ctx.close();
				break;
			case CONNACK:
				ack(ctx, (MqttConnAckMessage) message);
		
			default:// 如果有消息来了就置为可用,因为这里的default匹配的一定是其publish 或sub消息类型
				whenPongResp();
				ctx.fireChannelRead(msg);
				break;
			}
		} else
			ctx.close();

	}

	public void ack(ChannelHandlerContext ctx, MqttConnAckMessage ackMessage) {

		
		switch (ackMessage.variableHeader().connectReturnCode()) {
		
		case CONNECTION_ACCEPTED:
			AtomicBoolean isLogin=Status.isLogin;
			isLogin.set(true);
			logger.info("登录成功");
			if(successlogin!=null) {
				successlogin.accept(ctx.channel());
			}
			break;
		default: 
			if (logger.isDebugEnabled()) {
				// 登录失败
				logger.warn("登录失败" + ackMessage.variableHeader().connectReturnCode());
			}
			break;
		}
	}
	
	
	/**
	 * 当收到pong报文的时候
	 */
	public void whenPongResp() {
		if(pingRunnable!=null) {			
			pingRunnable.updatehasResp(true);
		}
		else {
			logger.warn("为什么pingrunnable还是空的?");
		}
	}
	

	public PingRunnable getPingRunnable() {
		return pingRunnable;
	}

	
	
	
}
